### Classify individuals into once movers and non-movers.
### Extract financial distress measures at quarterly level for movers and non-movers


# Load packages
import calendar
import os
import sys
import shutil
import subprocess
import pandas as pd
import numpy  as np
from pyspark     import SparkContext, SparkConf
from pyspark.sql import SQLContext
from SparkDataTools import *




# spark = SparkContext()
# spark.setLogLevel("ERROR")
# sqlContext = SQLContext(spark) 


# Set spark working path
WORKDIR = '/geo_debt/geo/Spark'
CODEDIR = '/geo_debt/geo/Code/4_Spark/'
OUTDIR  = '/geo_debt/geo/Output/Table'

##################################################
### Hardcodes       
##################################################

#start Y, end Y, quarters
#from Q3 2000 to Q3 2016
startY  = 2000
endY    = 2016
numQ    = (endY - startY + 1) * 4 -3 
startM  = 9

startT = yyyymm(startY, startM)
endT   = yyyymm(endY,   startM)


#cross walk
CW      = 'parquet.`/geo_debt/geo/cw/cw_zip_cz`'

#go to owrking directory
os.chdir(WORKDIR)


            
##################################################
### Functions of SQL codes       
##################################################

### get customer id, age, zipcode and save temp table
def get_customer_header(y, m, tempN='customer'):

    # get header table
    # extract subject key, birthdates, and zipcodes
    ym = yyyymm(y, m)
    if y>2008:
        # there might be very few cases, where the subject key between header and customer does not agree
        headersData = table_setup(sqlContext, y, m, 'headers')
        headersData.createOrReplaceTempView("headersData")
        customsData = table_setup(sqlContext, y, m, 'customs')
        customsData.createOrReplaceTempView("customsData")


        sql2run = 'SELECT a.subjectkey, a.zip, a.birthDate, ' + ym + ' AS start_t, ' + ym + ' AS end_t, 1 AS num_t, b.score ' + \
                  'FROM headersData a INNER JOIN customsData b ON a.subjectkey=b.subjectkey ' + \
                  'WHERE a.birthDate IS NOT NULL ' 
    else:
        #header from pre 2008 data contains no age/zipcode info, use custom
        # assign some dummy birthdates values, we will take the max(birthdates) for each customer
        # so dummy values will drop later on
        customsData = table_setup(sqlContext, y, m, 'customs')
        customsData.createOrReplaceTempView("customsData")
        
        sql2run = 'SELECT subjectkey, zipcode as zip, 10000101 AS birthDate, ' + ym + ' AS start_t, ' + ym + ' AS end_t, 1 AS num_t, score ' + \
                  'FROM customsData '      

    # save temp table
    sqlContext.sql(sql2run).write.mode('overwrite').parquet(os.path.join(WORKDIR, tempN))



### compare new customers vs existing customers, to figure out mover/non-movers
def get_mover(y,m, tempA='mover', tempB='customer_new'):

    # temp table for current mover list and new customer list
    fa = os.path.join(WORKDIR, tempA)
    fb = os.path.join(WORKDIR, tempB)
    tempa = 'parquet.`' + fa + '`'
    tempb = 'parquet.`' + fb + '`'

    # sql syntax to run
    ym = yyyymm(y, m)

    # merge existing customers with new customers
    # the output of this script is a table with customer X zipcodes
    sql2run = 'SELECT ' + \
              'CASE WHEN a.subjectkey IS NULL THEN b.subjectkey ELSE a.subjectkey END AS subjectkey, ' + \
              'CASE WHEN a.zip        IS NULL THEN b.zip        ELSE a.zip        END AS zip       , ' + \
              'CASE WHEN a.subjectkey IS NULL THEN ' + ym + '   ELSE a.start_t    END AS start_t   , ' + \
              'CASE WHEN b.subjectkey IS NULL THEN a.end_t      ELSE ' + ym + '   END AS end_t     , ' + \
              'CASE WHEN b.subjectkey IS NULL THEN a.num_t ELSE ' + \
                  'CASE WHEN a.subjectkey IS NULL THEN 1 ELSE a.num_t+1 END       END AS num_t     , ' + \
              'CASE WHEN b.subjectkey IS NULL THEN a.birthDate  ELSE b.birthDate  END AS birthDate,  ' + \
              'CASE WHEN b.subjectkey IS NULL THEN a.score      ELSE b.score      END AS score       ' + \
              'FROM ' + tempa + ' a FULL OUTER JOIN ' + tempb + ' b ON a.subjectkey=b.subjectkey AND a.zip=b.zip ' 
                    
    # save consolidated table
    fa2 = os.path.join(WORKDIR, tempA+'temp')    
    sqlContext.sql(sql2run).write.mode('overwrite').parquet(fa2)    

    # update previous table
    shutil.rmtree(fa)
    os.rename(fa2, fa)
    shutil.rmtree(fb)

    print('finished ' + ym)



# clean up mover table and perform sample selection on mover and nonmovers
def selection_mvr(startT, endT, numQ, mvTab='mover', outTab='oncemv_nonmv'):
    ### merge customer X move time with CZ crosswalk
    # then group by at customer level
    # assign origination zip/cz for everyone, destination zip/cz for movers only

    # input is mvTab
    # output is matched sample: outTab

    tempFn  = 'parquet.`' + os.path.join(WORKDIR, mvTab) + '`'

    sql2run = 'SELECT a.subjectkey, ' + \
              'MIN(a.start_t)   AS start_t,    MAX(a.start_t) AS new_start_t, ' + \
              'MIN(a.end_t)     AS old_end_t,  MAX(a.end_t)   AS end_t,  '      + \
              'MAX(a.birthDate) AS birthDate, '      + \
              str(endY) + ' - FLOOR(MAX(a.birthDate)/10000)   AS age_now, ' + \
              'COUNT(a.zip)-1               AS num_zip_mv,   ' + \
              'COUNT(DISTINCT c.county)-1   AS num_cnty_mv,  ' + \
              'COUNT(DISTINCT c.cz)-1       AS num_cz_mv,    ' + \
              'COUNT(DISTINCT c.statefip)-1 AS num_st_mv,    ' + \
              'SUM(a.num_t)                 AS num_rec,      ' + \
              'MAX(CASE WHEN a.start_t= ' + startT + ' THEN c.zip       ELSE NULL END) AS zip_o,  ' + \
              'MAX(CASE WHEN a.start_t= ' + startT + ' THEN c.county    ELSE NULL END) AS cnty_o, ' + \
              'MAX(CASE WHEN a.start_t= ' + startT + ' THEN c.cz        ELSE NULL END) AS cz_o,   ' + \
              'MAX(CASE WHEN a.start_t> ' + startT + ' AND a.end_t= ' + endT + ' THEN c.zip    ELSE NULL END) AS zip_d,  ' + \
              'MAX(CASE WHEN a.start_t> ' + startT + ' AND a.end_t= ' + endT + ' THEN c.county ELSE NULL END) AS cnty_d, ' + \
              'MAX(CASE WHEN a.start_t> ' + startT + ' AND a.end_t= ' + endT + ' THEN c.cz     ELSE NULL END) AS cz_d    ' + \
              'FROM ' + tempFn + ' a INNER JOIN ' + CW + ' c ON a.zip=c.zip ' + \
              'GROUP BY a.subjectkey'
    sqlContext.sql(sql2run).write.mode('overwrite').parquet(os.path.join(WORKDIR, 'mv_customer'))  


    ### focus on individuals who show up in every snapshot, and are between 30 and 80 years old at the end of snapshot
    # if someone move from cz A to B (...) and back to A, currently we under count the number of move, delete those (but we need to note this)
    tempFn  = 'parquet.`' + os.path.join(WORKDIR, 'mv_customer') + '`'

    sql2run = 'SELECT * , CASE WHEN age_now>=80 THEN 70 ELSE 10*FLOOR(age_now/10) END AS age10, num_zip_mv AS num_mv  FROM ' + tempFn + \
              ' WHERE start_t = ' + startT + ' AND end_t = ' + endT + ' AND num_rec= ' + str(numQ) + \
              ' AND NOT (num_cz_mv>=1   AND new_start_t<=old_end_t) ' + \
              ' AND NOT (num_cnty_mv>=1 AND new_start_t<=old_end_t) ' + \
              ' AND age_now BETWEEN 30 AND 80'
    sqlContext.sql(sql2run).write.mode('overwrite').parquet(os.path.join(WORKDIR, 'mv_nonmv')) 
    

    # focus on once-cz-movers and non-zip-movers
    tempFn  = 'parquet.`' + os.path.join(WORKDIR, 'mv_nonmv') + '`'

    sql2run = 'SELECT * , new_start_t AS move_t FROM ' + tempFn + ' WHERE num_mv<=1 AND num_cz_mv=num_mv '
    sqlContext.sql(sql2run).write.mode('overwrite').parquet(os.path.join(WORKDIR, outTab)) 

    #removed intermediate table
    shutil.rmtree(os.path.join(WORKDIR, 'mv_customer'))



# iteratively update master table
def append_tab(masterTab, newTab):

    FA = os.path.join(WORKDIR, masterTab)
    fb = os.path.join(WORKDIR, newTab)
    tempA = 'parquet.`' + FA + '`'
    tempB = 'parquet.`' + fb + '`'

    #union new and consolidated data
    sql2run = ' SELECT * FROM ' + tempA + ' UNION ALL SELECT * FROM ' + tempB

    # save consolidated table
    FA2 = os.path.join(WORKDIR, masterTab+'_temp')    
    sqlContext.sql(sql2run).write.mode('overwrite').parquet(FA2)    

    # update previous table
    shutil.rmtree(FA)
    os.rename(FA2, FA)
    shutil.rmtree(fb)



### Arrange once mover and non-mover data
def arrange_mvr(mvTab='oncemv_nonmv', outTab='match_sample_nodraw'):

    ### number of observations in each zip origin, move quarter, age bin to sample
    tempFn  = 'parquet.`' + os.path.join(WORKDIR, mvTab) + '`'

    sql2run = 'SELECT  subjectKey, birthDate, zip_o, cnty_o, cz_o, age10, age_now, num_mv, start_t, end_t, new_start_t AS move_t, ' + \
              ' CASE WHEN  zip_d IS NULL AND num_mv=0 THEN  zip_o ELSE  zip_d END as  zip_d, ' + \
              ' CASE WHEN cnty_d IS NULL AND num_mv=0 THEN cnty_o ELSE cnty_d END as cnty_d, ' + \
              ' CASE WHEN   cz_d IS NULL AND num_mv=0 THEN   cz_o ELSE   cz_d END as   cz_d  ' + \
              'FROM ' + tempFn 
    sqlContext.sql(sql2run).write.mode('overwrite').parquet(os.path.join(WORKDIR, outTab))   


### merge in TU data for smapled mover/non-mover at each snapshot
def get_mvr_sanpshot(y, m, outTab='for_mover_zip', snapshotTab='merge_temp', sampleTab='match_sample', groupbyMore=', cz_o, cz_d, age10 ', getIndividuals=False):

    #extract out standard TU outcome variables
    aggSQL = data_snapshot(sqlContext, WORKDIR, y, m, outTab=snapshotTab, filterTab=sampleTab, snapShotData=False)

    tempA  = 'parquet.`' + os.path.join(WORKDIR, sampleTab)   + '`'
    tempB  = 'parquet.`' + os.path.join(WORKDIR, snapshotTab) + '`'

    # merge mover/non-mover sample with their key credit outcome variables
    if getIndividuals:
      mover_id = 'CASE WHEN a.num_mv=1 THEN b.subjectKey ELSE 0 END as ind_id, '
      
    else:
      mover_id = ''

    sql2run = 'SELECT a.num_mv, a.move_t ' + groupbyMore + ' , ' +\
              'FLOOR((b.asOfDate  - a.birthDate)/10000)*1.0 AS age,    ' + \
              'FLOOR(a.move_t/100 - a.birthDate /10000)*1.0 AS age_mv, ' + mover_id + \
              'b.*  ' + \
              'FROM ' + tempA + ' a LEFT JOIN ' + tempB + ' b ' + \
              'ON a.subjectKey = b.subjectKey ' 
    sqlContext.sql(sql2run).write.mode('overwrite').parquet(os.path.join(WORKDIR, 'sample_snapshot'))
    

    # collapse at our level of variation: mover/non-mover X cz origin X destination X move time X age bin at move
    # note that we have three age variable here:
    # 1) age10: age at the end of panel at 10 years bin, we only used this to sample non-movers. This simplifies sampling process
    # 2) age: age as of time of TU credit output variables, at asOfDate
    # 3) age_mv: age at the time of move
    tempFn = 'parquet.`' + os.path.join(WORKDIR, 'sample_snapshot') + '`'

    if getIndividuals:
      groupbyMore = groupbyMore + ', ind_id '

    sql2run = 'SELECT num_mv, asOfDate, move_t ' + groupbyMore + ' , ' + \
              'AVG(age) AS age, AVG(age_mv) AS age_mv, ' + aggSQL + \
              'FROM ' + tempFn + ' GROUP BY num_mv, asOfDate, move_t' + groupbyMore
    sqlContext.sql(sql2run).write.mode('overwrite').parquet(os.path.join(WORKDIR, outTab))

    #remove temp tables
    shutil.rmtree(os.path.join(WORKDIR, 'sample_snapshot'))
    shutil.rmtree(os.path.join(WORKDIR, snapshotTab))


### combine credit score across snapshots of each sampled mover and matched non-mover
def mvr_data_stack(outTab='for_mover_zip', sampleTab='match_sample', groupbyMore=', cz_o, cz_d, age10 ', getIndividuals=False):

    #look up data for these movers non-movers        

    ### get first snapshot of mover panel
    get_mvr_sanpshot(startY, startM, outTab=outTab, sampleTab=sampleTab, groupbyMore=groupbyMore, getIndividuals=getIndividuals)
    print('finished ' + yyyymm(startY, startM))

    ### loop across each year and month
    for y in range(startY, endY+1):
        # for each quarter
        for m in quarter_end_month(y):

            #get data for mover/non-mover in current snapshots
            ym = yyyymm(y, m)
            get_mvr_sanpshot(y, m, outTab='mover_new', sampleTab=sampleTab, groupbyMore=groupbyMore, getIndividuals=getIndividuals)

            #update master table
            append_tab(outTab, 'mover_new')

            print('finished ' + ym) 


#########################################################
### Main
#########################################################


### 1: classify once movers and non-movers
# first point of reference
get_customer_header(startY, startM, tempN='mover')

### loop across each year and month
for y in range(startY, endY+1):
    # for each quarter
    for m in quarter_end_month(y):

        # create temp table for customers at this snapshot
        get_customer_header(y, m, tempN='customer_new')            

        # merge with existing list of mover
        get_mover(y,m)


### 2: Extract movers and non-movers 
# clean up mover table and perform sample selection on mover and nonmovers
selection_mvr(startT, endT, numQ, mvTab='mover', outTab='oncemv_nonmv')

# arrange movers and non-movers
arrange_mvr(mvTab='oncemv_nonmv', outTab='match_sample_nodraw')


### 3: look up credit outcome variables for these movers/non-movers
#zip level
mvr_data_stack(outTab='for_mover', sampleTab='match_sample_nodraw', groupbyMore=', cz_o, cz_d, cnty_o, cnty_d, zip_o, zip_d, age10 ')


### 4: Aggregation and export
# clean up output abd exort to csv
cleanOutput('mv_nmv', 'for_mover', OUTDIR, WORKDIR, sqlContext)


sqlContext.clearCache()
os.chdir(CODEDIR)
print(' !!!! SUCCESS !!!!')

